{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 课程目标\n",
    "1. 单向结构面临的问题\n",
    "2. 集群架构面临的问题\n",
    "3. Hodoop集群架构\n",
    "4. 冗余化数据存储\n",
    "5. 分布式文件系统"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 海量数据例子--Google\n",
    "* 100亿个网页\n",
    "* 平均网页大小 = 20KB\n",
    "* 100亿 * 20KB 200TB\n",
    "* 磁盘读取带宽 = 50MB/sec\n",
    "* 读取数据所需时间 = 400万秒 = 46天\n",
    "* 后续的数据处理与操作花费的时间更多\n",
    "\n",
    "## 集群架构\n",
    "* 每一个rack（机架服务器）包含16到64个Linux节点\n",
    "* 2011年据统计Goolge约有100万台机器\n",
    "\n",
    "## 集群计算需要面对的问题1\n",
    "### 节点故障：\n",
    "* 1000台服务器的集群=》平均故障率1次/天\n",
    "* 100万台服务器的集群=》平均故障率1000次/天\n",
    "\n",
    "### 如何保持数据的持续行\n",
    "* 某点节点故障的情形下不影响数据的使用\n",
    "* 长时间集群的运算，如何应对节点的故障\n",
    "\n",
    "## 集群计算需要面对的问题2\n",
    "### 网络带宽瓶颈：\n",
    "* 网络带宽 = 1 Gbps\n",
    "* 移动10TB数据需要花费将近一天\n",
    "\n",
    "### 分布式编程非常复杂：\n",
    "* 需要一个简单的模型能够隐去所有的复杂性\n",
    "\n",
    "## Hadoop分布集群\n",
    "* hadoop是依据mapreduce的原理，用Java语言实现的分布式处理机制\n",
    "* Hadoop是一个能够对大量数据进行分布处理的软件框架，实现了Google的MapReduce编程模型和框架，能够把应用程序分割成许多的小的工作单元，并把这些单元放到任何集群节点上执行\n",
    "* MapReduce是Hadoop中的一个数据运算核心模块\n",
    "\n",
    "## Map-Reduce集群运算问题的解决方案\n",
    "* 在多节点上冗余地存储数据，以保证数据的持续性\n",
    "* 将计算移向数据端，以最大程度减少数据移动\n",
    "* 简单的程序模型，隐藏所有的复杂度\n",
    "\n",
    "## 冗余化数据存储结构\n",
    "### 分布式文件存储系统：\n",
    "* 提供全局的文件命名空间，冗余度和可获取性\n",
    "* 例如Google的GFS,hadoop的HDFS\n",
    "\n",
    "### 典型的应用场景与模型：\n",
    "* 超大级别的数据量（100GB到100TB级别）\n",
    "* 数据很少被全部替换\n",
    "* 最常见的操作为读取和追加数据\n",
    "\n",
    "## 分布式文件系统\n",
    "* 数据以“块状”形式在多台机器上存储\n",
    "* 每个数据块都会重复地在多台机器上存储\n",
    "* 保证数据的持续性和随时可取性\n",
    "* 服务器块同时也用作计算服务器\n",
    "* 把运算挪向数据处\n",
    "### 服务器块：\n",
    "* 文件被分作16-64MB大小的连续块\n",
    "* 每个文件块会被重复的存储2到3次\n",
    "* 尽量保证重复的数据库在不同的机架上\n",
    "### 主节点\n",
    "* Hadoop的HDFS里叫做Name节点\n",
    "* 存储元数据记录文件存储结构和地址\n",
    "* 最常见的操作为读取和追加数据\n",
    "### 文件访问的客户端库：\n",
    "* 询问主节点以获取块服务器地址\n",
    "* 直接连接相应服务器块获取数据\n",
    "\n",
    "## Hadoop组成\n",
    "* Hadoop Common\n",
    "### 工具包，为其他hadoop模块提供基础设施\n",
    "* Hadoop HDFS\n",
    "### 分布式文件系统，对海量数据存储\n",
    "* Map-Reduce\n",
    "### 分布式处理策略，计算模型，对海量数据处理\n",
    "* Yarn\n",
    "### 分布式资源管理，调度\n",
    "\n",
    "## 编程模型：Map-Reduce\n",
    "* 手头上有一个超大的文本文件\n",
    "* 需要统计每个文本中的词出现的次数\n",
    "* 实际应用场景\n",
    "   1. 从web服务器日志中找到高频热门url\n",
    "   2. 搜索词统计\n",
    "   3. 区分垃圾邮件和短信\n",
    "   4. 舆情分析（正负面评论）\n",
    "\n",
    "# 场景1\n",
    "* 文件本身太大无法全部载入内存\n",
    "* 所有词和频次对<word，count>都超出了内存大小\n",
    "* 运算速度缓慢\n",
    "\n",
    "## Map-Reduce：总览\n",
    "## Map\n",
    "* 逐个文件逐行扫描\n",
    "* 扫描的同时抽取出我们感兴趣的内容（Keys）\n",
    "### Group by key\n",
    "* 排序和洗牌\n",
    "### Reduce\n",
    "* 聚会、总结、过滤或转换\n",
    "* 写入结果\n",
    "## Map和Reduce函数要根据具体问题具体实现\n",
    "\n",
    "## Map-Reduce：Reduce步骤\n",
    "### 输入：一些键值对\n",
    "### 编程人员需要定义两个函数：\n",
    "* Map（k、v）-><k',v'>*\n",
    "  ### 对一个键值对输入产生一序列中间键值对\n",
    "  ### Map函数将对所有键值对操作\n",
    "* Reduce（k',<v'>*>-><k',v' '>*\n",
    "  ### 所有有相同key，k'的值v'被reduce到一起\n",
    "  ### Reduce函数对每一个不同的Key k'进行操作\n",
    "  \n",
    "## 使用Map-Reduce统计词频\n",
    "* Map（key，value）:\n",
    " ### #key:文档名称；   value：文档的文本内容\n",
    "* Reduce（key，values）:\n",
    " ### #key：一个单词；  value：一个计数的迭代器\n",
    " \n",
    "# 场景2-HOST大小\n",
    "* 假设我们有一个大型网络语料库元数据文件格式如下：\n",
    "  ### 每条记录格式都是：（URL，size，date...）\n",
    "* 对于每一个Host，获取其字节大小\n",
    "\n",
    "### Map\n",
    "* 顺序扫描，对每一条记录，生产键值对（hostname（URL），size）\n",
    "### Reduce\n",
    "* 对相同host的键值对的值（字节数）进行求和\n",
    "\n",
    "## Map-Reduce：环境\n",
    "### 运行Map-Reduce模型，还需要hadoop环境解决\n",
    "* 对原始数据进行区分（Patition）\n",
    "* 调度程序在一系列的机器集群上都并行运行\n",
    "* 执行中间过程的group by key步骤\n",
    "* 处理运行过程中的突发节点故障\n",
    "* 处理并运行过程中的节点和节点之间的通信\n",
    "\n",
    "## 数据流\n",
    "* 输入和输出都被存储在分布式文件系统HDFS上\n",
    "* 实际调度操作时，调度器会尽可能将map任务移至靠近数据物理存储的节点上\n",
    "* 中间结果将会被存储在map和reduce操作的本地文件系统上\n",
    "* 实际运行过程中，一个Map-Reduce产生的结构，很可能作为另一个Map-Reduce任务的输入\n",
    "\n",
    "## 主节点的协调功能\n",
    "* 主节点主要负责系统的协调\n",
    "* 任务状态：等待初始、进行中、完成\n",
    "* 一旦有能工作的worker，待初始任务被调度运行\n",
    "* 一个Map任务完成后，它会向主节点发送它的产生的R个中间文件的位置和大小，每个文件对应一个reducer\n",
    "* 主节点将这些信息传送至reducer\n",
    "\n",
    "## 节点故障\n",
    "### Map任务节点故障\n",
    "* 所有运行中和已经完成的map任务，都被重置为待初始\n",
    "* 所有这些待初始Map任务，将重新被分配到能工作的节点worker\n",
    "### Reduce任务节点故障\n",
    "* 只有运行中而未完成的reduce任务被设定为待初始\n",
    "* 这些待初始reduce任务被重新分配至其他worker上\n",
    "### 主节点故障\n",
    "* 整个Map-Reduce任务中断，同时通知客户端管理员\n",
    "\n",
    "## 启动多少个Map和Reduce任务呢\n",
    "* M个Map任务和R个Reduce任务\n",
    "* 实际操作经验法则：\n",
    "  ### 通常情况下我们会让M远大于集群中的节点数\n",
    "  ### 通常设置一个分布文件系统块对应一个Map任务\n",
    "  ### 提升动态加载平衡，同时加速节点故障时的任务恢复\n",
    "* 通常R比M要小\n",
    "  ### 因为输出要分布在R文件上\n",
    "  \n",
    "## 动态添加map和reduce的大小，增加并行度\n",
    "* map是配置mapred.max.split.size，来定义map处理文件的大小，默认是256000000字段，换算就是256M。如果想增加map的并行度，那么就是减少map处理文件的大小set\n",
    "### mapred.max.split.size=xxx（更小的字节）\n",
    "\n",
    "* reduce和map是一致的，修改hive.exec.reducers.bytes.per.reducer这个参数，通过控制这个来定义一个reduce处理文件的大小hive.exec.reducers.bytes.per.reducer\n",
    "\n",
    "## 改进：combiners\n",
    "* 以词频统计为例\n",
    "### 合并器（Combiner）预先合并了单个mapper（单个节点）中键值对\n",
    "\n",
    "# 总结\n",
    "* reduce需要写函数，map有时候都不用写\n",
    "* map工作主要修改key，reduce主要修改values\n",
    "* 对已有的算法进行map-reduce化\n",
    "* map对一个键值对输入产生一序列中间键值对\n",
    "* map函数将对所有输入键值对操作\n",
    "* 相同的key值v被reduce放一起，Reduce函数对每一个不同的key进行操作\n",
    "* map和reduce属于分治思想，通过hash分桶来处理，map是发散的过程，reduce是收敛的过程\n",
    "* map任务数目要远大于Reduce\n",
    "* map-reduce会有输入和输出，输出后再次进入map-reduce，如此循环迭代，在磁盘级别的操作，所以开销会很大，spark是在内存级别的操作，所有对内存开销会很大，但速度很快\n",
    "* spark稳定不如map，spark只读一次\n",
    "* map-reduce主要做特征的转换，数据的提取，转换，处理，写入\n",
    "* 做特征的用map，reduce，导出的特征用于机器学习训练的用spark建模，用hadoop streaming方便任何语言编写map-reduce\n",
    "\n",
    "\n",
    "# Spark和Hive介绍\n",
    "\n",
    "## Hive\n",
    "* 在Hadoop的Map-Reduce之上提供的类SQL数据提取操作功能\n",
    "\n",
    "## Spark\n",
    "* 分布式计算框架， 是Map-Reduce替代方案，兼容HDFS和Hive\n",
    "* 可兼容hadoop生态，弥补Mapduce不足\n",
    "\n",
    "## Hive介绍\n",
    "* Hive是基于Hadoop的一个数据仓库工具，可以将结构化的数据文件映射为一张数据库表，并提供简单的sql查询功能，可以将sql语句转换为MapReduce任务进行运行。其优点是学习成本低，可以通过类SQL语句快速实现简单的MapReduce统计，十分适合数据仓库的统计分析\n",
    "## Hive架构介绍\n",
    "* 用户接口三个：CLI、Client和WUI。在启动Client模式的时候，需要指出Hive Server在哪个节点启动，WUI是通过浏览器访问Hive\n",
    "* Hive将元数据存储在数据库中，如mysql。Hive中的元数据包括表的名字，表的列和分区及其属性，表的属性，表的数据所在目录等。\n",
    "* HQL生成的查询计划存储在HDFS中，并在随后有MapReduce调用执行。\n",
    "* Hive的数据存储在HDFS中，大部分的查询，计算由MapReduce完成\n",
    "## Hive架构\n",
    "* 无hive：使用者-> mapreduce->hadoop数据（可能需要会mapreduce）\n",
    "* 有hive：使用者->HQL(SQL)->hive->mapreduce->hadoop数据（使用SQL）\n",
    "\n",
    "## 关联规则介绍\n",
    "* 数据挖掘是一项从大量的记录数据中提取有价值的、人们感兴趣的知识，这些知识是隐含的、事先未知的有用信息，提取的知识一般可表示为概念（Concepts）、规则（Rules）、规律（Regular ides）、模式（Patterns）等形式。\n",
    "\n",
    "* 规则：样本和样本之间的关联性\n",
    "* 模式：通过特征X，经过函数f得到结构y\n",
    "\n",
    "* 关联规则是当前数据挖掘研究的主要方法之一，它反映一个事物与其他事物之间的相互依存性和关联性\n",
    "* 典型的关联规则发现问题是对超市中的货蓝数据（MarketBasker）进行分析。通过发现顾客放入货蓝中的不同商品之间的关系来分析顾客的购买习惯。\n",
    "\n",
    "## 关联规则基本概念\n",
    "* 每一个样本叫一个项目\n",
    "* 一个顾客购买商品的购物车（购物单），项目的组合叫事务\n",
    "* 事务中有意义的项目集合叫做项集，比如面包和牛奶，就是二项集\n",
    "* 我们要挖掘的是项集\n",
    "\n",
    "* 1000个人购物，1000个购物单，牛奶在购物单中出现的次数叫支持度\n",
    "* 当支持度高到一定程度，才会观测出有意义的信息和规则，设定一个阈值\n",
    "\n",
    "* 项集A在事务数据库D中出现的次数占D中总事务的百分比叫做项集的支持度。如果项集的支持度超过用户给定的最小支持度阈值，就称该项集是频繁项集（或频集）\n",
    "\n",
    "* 关联规则是形如X=>Y的逻辑蕴含式，其中X∈I，Y∈I，且X∩Y=无\n",
    "* 如果事务数据库D中有s%的事务包含X∪Y，则称关联规则X=>Y的支持率为s%\n",
    "* 关联规则的信任度为support（X∪Y）/support（X）也就是：\n",
    "    ### support（X=>Y)=P(X∪Y)\n",
    "    ### confidence（X=>Y)=P(X | Y)\n",
    "    \n",
    "## 支持度和信任度理解\n",
    "* 支持度：关联规则产出的是规则，找到频繁项集，再找出有意义的规则，支持度确定哪些是经常出现的\n",
    "* 信任度：信任度产出规则，知道X和Y是一个频繁项目集，谁对谁的影响更大\n",
    "\n",
    "## 强关联规则\n",
    "* 强关联规则就是支持度和信任度分别满足用户给定阈值的规则\n",
    "* 例：\n",
    "   交易ID    购买的商品\n",
    "   2000       A,B,C\n",
    "   1000       A,C\n",
    "   4000       A,D\n",
    "   5000       B,E,F\n",
    "### 设置最小支持度为50%，最小可信度为50%，则可得到\n",
    "* A=>C(50%,66.6%)\n",
    "* C=>A(50%,100%)\n",
    "\n",
    "## 关联规则挖掘算法\n",
    "* Agrawal等人提出的AIS,Aprior和AprioriTid\n",
    "* Cumulate和Stratify，Houstsma等人提出的SETM\n",
    "* Park等人提出的DHP\n",
    "* Savaser等人的PARTITION\n",
    "* Han等人提出的不生成候选集直接生成频繁模式FPGrowth\n",
    "* 其中最有效和有影响的算法为Apriori，DHP和ARTIRION,FPGrowth\n",
    "\n",
    "## Apriori算法\n",
    "* Apriori算法命名源于算法使用了频繁项集性质的先验（Prior）知识\n",
    "* Apriori算法将发现关联规则的过程分为两个步骤：\n",
    "  1. 通过迭代，检索出事务数据库中的所有频繁项集，即支持度不低于用户设定的阈值的项集\n",
    "  2. 利用频繁项集构造出满足用户最小信任度的规则\n",
    "* 挖掘或识别出所有频繁项集是该算法的核心，占整个计算量的大部分\n",
    "\n",
    "## Apriori两个选择\n",
    "### 性质1：频繁项集的所有非空子集必为频繁项集\n",
    "### 性质2：非频繁项集的超集一定是非频繁的\n",
    "\n",
    "## Apriori原理执行步骤\n",
    "* 剪枝步：C(K)要过滤掉未达到阈值的项集\n",
    "* 连接步（组合）：为找L(k)，通过将（K-1）与自身连接产生候选k项集的集合C(k)\n",
    "* 剪枝->连接->剪枝->连接->剪枝->连接->...执行K次\n",
    "\n",
    "## Apriori算法实例\n",
    "* 现在A、B、C、D、E五种商品的交易记录表，试找出三种商品关联销售情况（k=3），最小支持度=50%\n",
    "### 交易号     商品代号\n",
    "      100      A,C,D\n",
    "      200      B,C,E\n",
    "      300      A,B,C,E\n",
    "      400      B,E\n",
    "## Apriori不足\n",
    "* 验证过程是性能瓶颈\n",
    "    1. 交易数据库可能非常大\n",
    "    2. 比如频集最多包含10个项，那么就需要扫描交易数据库10遍\n",
    "    3. 需要很大的I/0负载"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 第2题\n",
    "### cat doc.txt\n",
    "### cat doc.txt | python map.py\n",
    "### cat doc.txt | python map.py | sort\n",
    "### cat doc.txt | python map.py | sort | python reduce.py\n",
    "### cat doc.txt | python map.py | sort | python reduce.py >myout"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
